{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Delta Lake on Hops\n",
    "\n",
    "This notebook contains some examples of how you can use Delta Lake on Hops.\n",
    "\n",
    "Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads. \n",
    "\n",
    "Key Features: \n",
    "\n",
    "- **ACID Transactions**:Data lakes typically have multiple data pipelines reading and writing data concurrently, and data engineers have to go through a tedious process to ensure data integrity, due to the lack of transactions. Delta Lake brings ACID transactions to your data lakes. It provides serializability, the strongest level of isolation level.\n",
    "\n",
    "- **Scalable Metadata Handling**:In big data, even the metadata itself can be “big data”. Delta Lake treats metadata just like data, leveraging Spark’s distributed processing power to handle all its metadata. As a result, Delta Lake can handle petabyte-scale tables with billions of partitions and files at ease.\n",
    "\n",
    "- **Time Travel (data versioning)**: Delta Lake provides snapshots of data enabling developers to access and revert to earlier versions of data for audits, rollbacks or to reproduce experiments.\n",
    "\n",
    "- **Open Format**: All data in Delta Lake is stored in Apache Parquet format enabling Delta Lake to leverage the efficient compression and encoding schemes that are native to Parquet.\n",
    "\n",
    "- **Unified Batch and Streaming Source and Sink**: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box.\n",
    "\n",
    "- **Schema Enforcement**: Delta Lake provides the ability to specify your schema and enforce it. This helps ensure that the data types are correct and required columns are present, preventing bad data from causing data corruption.\n",
    "\n",
    "- **Schema Evolution**: Big data is continuously changing. Delta Lake enables you to make changes to a table schema that can be applied automatically, without the need for cumbersome DDL.\n",
    "\n",
    "- **100% Compatible with Apache Spark API**: Developers can use Delta Lake with their existing data pipelines with minimal change as it is fully compatible with Spark, the commonly used big data processing engine.\n",
    "\n",
    "- **Audit History**: Delta Lake transaction log records details about every change made to data providing a full audit trail of the changes.\n",
    "\n",
    "- **Full DML Support**: Delta Lake supports standard DML including UPDATE, DELETE and MERGE INTO providing developers more controls to manage their big datasets."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "import io.hops.util.Hops\n",
      "import org.apache.spark.api.java.JavaSparkContext\n",
      "import org.apache.spark.sql.DataFrameWriter\n",
      "import org.apache.spark.sql.Dataset\n",
      "import org.apache.spark.sql.Row\n",
      "import org.apache.spark.sql.SaveMode\n",
      "import org.apache.spark.sql.SparkSession\n",
      "import java.sql.Date\n",
      "import java.sql.Timestamp\n",
      "import org.apache.spark.sql._\n",
      "import spark.implicits._\n",
      "import org.apache.spark.sql.types._\n"
     ]
    }
   ],
   "source": [
    "import io.hops.util.Hops\n",
    "import org.apache.spark.api.java.JavaSparkContext;\n",
    "import org.apache.spark.sql.DataFrameWriter;\n",
    "import org.apache.spark.sql.Dataset;\n",
    "import org.apache.spark.sql.Row;\n",
    "import org.apache.spark.sql.SaveMode;\n",
    "import org.apache.spark.sql.SparkSession;\n",
    "import java.sql.Date;\n",
    "import java.sql.Timestamp;\n",
    "import org.apache.spark.sql._\n",
    "import spark.implicits._\n",
    "import org.apache.spark.sql.types._"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "bulkInsertData: Seq[org.apache.spark.sql.Row] = List([1,2019-03-02,0.4151,Sweden], [2,2019-05-01,1.2151,Ireland], [3,2019-08-06,0.2151,Belgium], [4,2019-08-06,0.8151,Russia])\n",
      "schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(date,DateType,true), StructField(value,FloatType,true), StructField(country,StringType,true))\n",
      "bulkInsertDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n",
      "+---+----------+------+-------+\n",
      "| id|      date| value|country|\n",
      "+---+----------+------+-------+\n",
      "|  1|2019-03-02|0.4151| Sweden|\n",
      "|  2|2019-05-01|1.2151|Ireland|\n",
      "|  3|2019-08-06|0.2151|Belgium|\n",
      "|  4|2019-08-06|0.8151| Russia|\n",
      "+---+----------+------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val bulkInsertData = Seq(\n",
    "    Row(1, Date.valueOf(\"2019-02-30\"), 0.4151f, \"Sweden\"),\n",
    "    Row(2, Date.valueOf(\"2019-05-01\"), 1.2151f, \"Ireland\"),\n",
    "    Row(3, Date.valueOf(\"2019-08-06\"), 0.2151f, \"Belgium\"),\n",
    "    Row(4, Date.valueOf(\"2019-08-06\"), 0.8151f, \"Russia\")\n",
    ")\n",
    "val schema = \n",
    " scala.collection.immutable.List(\n",
    "  StructField(\"id\", IntegerType, true),\n",
    "  StructField(\"date\", DateType, true),\n",
    "  StructField(\"value\", FloatType, true),\n",
    "  StructField(\"country\", StringType, true) \n",
    ")\n",
    "val bulkInsertDf = spark.createDataFrame(\n",
    "  spark.sparkContext.parallelize(bulkInsertData),\n",
    "  StructType(schema)\n",
    ")\n",
    "bulkInsertDf.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To create a Delta dataset, simply set the data format to \"delta\":"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "bulkInsertDf.write.format(\"delta\").save(s\"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_delta\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A Delta dataset keep tracks of a commit log to support ACID transactions.\n",
    "\n",
    "![Delta Dataset](./../images/delta_dataset.png \"Delta Dataset\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "val df = spark.read.format(\"delta\").load(s\"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_delta\")\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Delta also provides time-travel functionality that lets you inspect the value of a dataset at a particular point in time:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n",
      "+---+----------+------+-------+\n",
      "| id|      date| value|country|\n",
      "+---+----------+------+-------+\n",
      "|  3|2019-08-06|0.2151|Belgium|\n",
      "|  4|2019-08-06|0.8151| Russia|\n",
      "|  1|2019-03-02|0.4151| Sweden|\n",
      "|  2|2019-05-01|1.2151|Ireland|\n",
      "+---+----------+------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val df = spark.read.format(\"delta\").option(\"versionAsOf\", 0).load(s\"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_delta\")\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Delta supports usperts and overwrite:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "overwriteData: Seq[org.apache.spark.sql.Row] = List([1,2019-06-30,0.4151,Sweden], [2,2019-05-01,1.2151,Ireland], [3,2017-08-06,0.2151,Belgium], [4,2019-08-06,0.8151,Russia])\n",
      "overwriteDataDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n",
      "+---+----------+------+-------+\n",
      "| id|      date| value|country|\n",
      "+---+----------+------+-------+\n",
      "|  1|2019-06-30|0.4151| Sweden|\n",
      "|  2|2019-05-01|1.2151|Ireland|\n",
      "|  3|2017-08-06|0.2151|Belgium|\n",
      "|  4|2019-08-06|0.8151| Russia|\n",
      "+---+----------+------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val overwriteData = Seq(\n",
    "    Row(1, Date.valueOf(\"2019-06-30\"), 0.4151f, \"Sweden\"),\n",
    "    Row(2, Date.valueOf(\"2019-05-01\"), 1.2151f, \"Ireland\"),\n",
    "    Row(3, Date.valueOf(\"2017-08-06\"), 0.2151f, \"Belgium\"),\n",
    "    Row(4, Date.valueOf(\"2019-08-06\"), 0.8151f, \"Russia\")\n",
    ")\n",
    "val overwriteDataDf = spark.createDataFrame(\n",
    "  spark.sparkContext.parallelize(overwriteData),\n",
    "  StructType(schema)\n",
    ")\n",
    "overwriteDataDf.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "overwriteDataDf.write.format(\"delta\").mode(\"overwrite\").save(s\"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_delta\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n",
      "+---+----------+------+-------+\n",
      "| id|      date| value|country|\n",
      "+---+----------+------+-------+\n",
      "|  1|2019-06-30|0.4151| Sweden|\n",
      "|  2|2019-05-01|1.2151|Ireland|\n",
      "|  3|2017-08-06|0.2151|Belgium|\n",
      "|  4|2019-08-06|0.8151| Russia|\n",
      "+---+----------+------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val df = spark.read.format(\"delta\").load(s\"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_delta\")\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n",
      "+---+----------+------+-------+\n",
      "| id|      date| value|country|\n",
      "+---+----------+------+-------+\n",
      "|  3|2019-08-06|0.2151|Belgium|\n",
      "|  4|2019-08-06|0.8151| Russia|\n",
      "|  1|2019-03-02|0.4151| Sweden|\n",
      "|  2|2019-05-01|1.2151|Ireland|\n",
      "+---+----------+------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val df = spark.read.format(\"delta\").option(\"versionAsOf\", 0).load(s\"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_delta\")\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n",
      "+---+----------+------+-------+\n",
      "| id|      date| value|country|\n",
      "+---+----------+------+-------+\n",
      "|  1|2019-06-30|0.4151| Sweden|\n",
      "|  2|2019-05-01|1.2151|Ireland|\n",
      "|  3|2017-08-06|0.2151|Belgium|\n",
      "|  4|2019-08-06|0.8151| Russia|\n",
      "+---+----------+------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val df = spark.read.format(\"delta\").option(\"versionAsOf\", 1).load(s\"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_delta\")\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To upsert data in a delta dataset, use the **merge** primitive:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "upsertData: Seq[org.apache.spark.sql.Row] = List([5,2019-03-02,0.7921,Northern Ireland], [1,2019-05-01,1.151,Norway], [3,2019-08-06,0.999,Belgium], [6,2019-08-06,0.0151,France])\n",
      "upsertDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n",
      "+---+----------+------+----------------+\n",
      "| id|      date| value|         country|\n",
      "+---+----------+------+----------------+\n",
      "|  5|2019-03-02|0.7921|Northern Ireland|\n",
      "|  1|2019-05-01| 1.151|          Norway|\n",
      "|  3|2019-08-06| 0.999|         Belgium|\n",
      "|  6|2019-08-06|0.0151|          France|\n",
      "+---+----------+------+----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val upsertData = Seq(\n",
    "    Row(5, Date.valueOf(\"2019-02-30\"), 0.7921f, \"Northern Ireland\"), //Insert\n",
    "    Row(1, Date.valueOf(\"2019-05-01\"), 1.151f, \"Norway\"), //Update\n",
    "    Row(3, Date.valueOf(\"2019-08-06\"), 0.999f, \"Belgium\"), //Update\n",
    "    Row(6, Date.valueOf(\"2019-08-06\"), 0.0151f, \"France\") //Insert\n",
    ")\n",
    "val upsertDf = spark.createDataFrame(\n",
    "  spark.sparkContext.parallelize(upsertData),\n",
    "  StructType(schema)\n",
    ")\n",
    "upsertDf.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "import io.delta.tables._\n",
      "import org.apache.spark.sql.functions._\n"
     ]
    }
   ],
   "source": [
    "import io.delta.tables._\n",
    "import org.apache.spark.sql.functions._"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "deltaTable: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@10f145bb\n"
     ]
    }
   ],
   "source": [
    "val deltaTable = DeltaTable.forPath(s\"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_delta\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "newData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, date: date ... 2 more fields]\n"
     ]
    }
   ],
   "source": [
    "val newData = upsertDf.as(\"newData\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "(deltaTable.as(\"oldData\")\n",
    "  .merge(\n",
    "    newData,\n",
    "    \"oldData.id = newData.id\")\n",
    "  .whenMatched\n",
    "  .update(Map(\"id\" -> col(\"newData.id\"), \"date\" -> col(\"newData.date\"), \n",
    "              \"value\" -> col(\"newData.value\"), \"country\" -> col(\"newData.country\")))\n",
    "  .whenNotMatched\n",
    "  .insert(Map(\"id\" -> col(\"newData.id\"), \"date\" -> col(\"newData.date\"), \n",
    "              \"value\" -> col(\"newData.value\"), \"country\" -> col(\"newData.country\")))\n",
    "  .execute())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n",
      "+---+----------+------+----------------+\n",
      "| id|      date| value|         country|\n",
      "+---+----------+------+----------------+\n",
      "|  5|2019-03-02|0.7921|Northern Ireland|\n",
      "|  2|2019-05-01|1.2151|         Ireland|\n",
      "|  3|2019-08-06| 0.999|         Belgium|\n",
      "|  6|2019-08-06|0.0151|          France|\n",
      "|  4|2019-08-06|0.8151|          Russia|\n",
      "|  1|2019-05-01| 1.151|          Norway|\n",
      "+---+----------+------+----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val df = spark.read.format(\"delta\").load(s\"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_delta\")\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n",
      "+---+----------+------+-------+\n",
      "| id|      date| value|country|\n",
      "+---+----------+------+-------+\n",
      "|  3|2019-08-06|0.2151|Belgium|\n",
      "|  4|2019-08-06|0.8151| Russia|\n",
      "|  1|2019-03-02|0.4151| Sweden|\n",
      "|  2|2019-05-01|1.2151|Ireland|\n",
      "+---+----------+------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val df = spark.read.format(\"delta\").option(\"versionAsOf\", 0).load(s\"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_delta\")\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n",
      "+---+----------+------+-------+\n",
      "| id|      date| value|country|\n",
      "+---+----------+------+-------+\n",
      "|  1|2019-06-30|0.4151| Sweden|\n",
      "|  2|2019-05-01|1.2151|Ireland|\n",
      "|  3|2017-08-06|0.2151|Belgium|\n",
      "|  4|2019-08-06|0.8151| Russia|\n",
      "+---+----------+------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val df = spark.read.format(\"delta\").option(\"versionAsOf\", 1).load(s\"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_delta\")\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "df: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]\n",
      "+---+----------+------+----------------+\n",
      "| id|      date| value|         country|\n",
      "+---+----------+------+----------------+\n",
      "|  5|2019-03-02|0.7921|Northern Ireland|\n",
      "|  2|2019-05-01|1.2151|         Ireland|\n",
      "|  3|2019-08-06| 0.999|         Belgium|\n",
      "|  6|2019-08-06|0.0151|          France|\n",
      "|  4|2019-08-06|0.8151|          Russia|\n",
      "|  1|2019-05-01| 1.151|          Norway|\n",
      "+---+----------+------+----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val df = spark.read.format(\"delta\").option(\"versionAsOf\", 2).load(s\"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_delta\")\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Spark",
   "language": "",
   "name": "sparkkernel"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}